
package com.shockweb.service;


import java.io.UnsupportedEncodingException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.shockweb.bridge.DataAgreement;
import com.shockweb.bridge.OperationDefine;
import com.shockweb.bridge.ServiceRequest;
import com.shockweb.bridge.ServiceResult;
import com.shockweb.common.log.LogManager;
import com.shockweb.service.config.ServiceConfig;
import com.shockweb.service.data.ServiceStatus;
import com.shockweb.service.exception.ServiceException;
import com.shockweb.service.exception.ServerException;
import com.shockweb.service.ioc.IocManager;
import com.shockweb.utils.Convert;
import com.shockweb.common.International;
import com.shockweb.common.serializable.SerializableObject;
import com.shockweb.common.context.ContextManager;
import com.shockweb.common.serializable.binary.BinaryReader;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 微服务端通信handler
 * 
 * @author 彭明华
 * 2018年1月3日 创建
 */
public class ServiceServerNettyHandler extends ChannelInboundHandlerAdapter {

	/**
	 * 服务端配置
	 */
	private ServiceConfig config;
	
	/**
	 * 服务高阶限流
	 */
	private ServiceThresholdManager serviceThresholdManager = null;
	
    /**
     * 线程池
     */
    private ExecutorService cachedThreadPool = null;
    
	/**
	 * 构造方法
	 * @param config
	 */
	public ServiceServerNettyHandler(ServiceConfig config,ExecutorService cachedThreadPool,ServiceThresholdManager serviceThresholdManager){
		this.config = config;
		this.cachedThreadPool = cachedThreadPool;
		this.serviceThresholdManager = serviceThresholdManager;
	}
	/**
	 * 调用次数
	 */
    private static long called = 0;
    private static final ReadWriteLock calledLock = new ReentrantReadWriteLock();
    
    private static void addCalled() {
        try {
        	calledLock.writeLock().lock();
            called++;
        } finally {
        	calledLock.writeLock().unlock();
        }
    }
    /**
     * 返回被调用次数
     * @return
     */
    public static long getCalled(){
        try {
        	calledLock.readLock().lock();
        	return called;
        } finally {
        	calledLock.readLock().unlock();
        }
    }
    /**
     * 出错次数
     */
    private static long error = 0;
    private static final ReadWriteLock errorLock = new ReentrantReadWriteLock();
    private static void addError() {
        try {
        	errorLock.writeLock().lock();
        	error++;
        } finally {
        	errorLock.writeLock().unlock();
        }
    }
    /**
     * 返回出错次数
     * @return
     */
    public static long getError(){
        try {
        	errorLock.readLock().lock();
        	return error;
        } finally {
        	errorLock.readLock().unlock();
        }
    }
    /**
     * 正在执行的服务数量
     */
    private static long doing = 0;
    private static final ReadWriteLock doingLock = new ReentrantReadWriteLock();
    
    public static boolean addDoing(long doingThreshold){
        try {
        	doingLock.writeLock().lock();
        	doing++;
        	if(doingThreshold>0 && doing > doingThreshold){
        		 return false;
	        }
        } finally {
        	doingLock.writeLock().unlock();
        }
        return true;
    }
    
    public static void subDoing(){
        try {
        	doingLock.writeLock().lock();
        	doing--;
        } finally {
        	doingLock.writeLock().unlock();
        }
    }
    
    /**
     * 返回正在执行的服务数量
     * @return
     */
    public static long getDoing(){
        try {
        	doingLock.readLock().lock();
        	return doing;
        } finally {
        	doingLock.readLock().unlock();
        }
    }
    
    /**
     * 超时次数
     */
    private static long timeOut = 0;
    
    /**
     * 返回超时次数
     * @return
     */
    public static long getTimeOut(){
    	return timeOut;
    }


    
    /**
     * ContextManager起始时间关键字
     */
    String key = "START_TIME";
    /**
     * 在服务端接收客户端发来的数据时触发， 简而言之就是从通道中读取数据， 但是这个数据在不进行解码时它是ByteBuf类型
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = null;
        //判断数据类型
        if (msg instanceof ByteBuf) {
            buf = (ByteBuf) msg;
        } else {
        	LogManager.errorLog(this.getClass(), new ServerException("接收到非法数据,非ByteBuf数据"));
            return;
        }
        addCalled();
        try {
	        byte[] data = new byte[buf.readableBytes()];
	        buf.readBytes(data);
	        cachedThreadPool.execute(new Runnable() {
				@Override
				public void run() {
					byte operation = 0;
					String uuid = null;
					try {
			            operation = DataAgreement.resolutionOperation(data);
			            uuid = DataAgreement.resolutionUUID(data);
		            } catch (Throwable e) {
		            	LogManager.errorLog(this.getClass(), new ServerException("接收到非法数据,非ByteBuf数据",e));
		            	return;
		            }
		            if (operation == OperationDefine.ALIVE.value()) {
		            	return;
		            }
		            ContextManager.setUuid(uuid);
		            ContextManager.putValue(key, System.currentTimeMillis());
		            int offset = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
		            try{
			            if(!addDoing(config.getDoingThreshold())){
			            	//整个服务限流
			            	throw new ServerException("当前服务器忙,,达到最大" + config.getDoingThreshold() + "限流设置");
			            }
			            if(operation == OperationDefine.REQ_SERVICE.value()) {
			            	//标准微服务调用
			            	ServiceRequest req = (ServiceRequest)Convert.convertToObject(data, offset, data.length-offset, ServiceRequest.class);
			            	if(!serviceThresholdManager.addDoings(req.getService())) {
			            		throw new ServerException(req.getService() + "服务忙,,达到最大" + serviceThresholdManager.getServiceThresholds(req.getService()) + "限流设置");
			            	}
			            	ServiceStatus.getServices().addCalled(req.getService());
			            	if(req.getContext()!=null){
			            		ContextManager.putAll(req.getContext());
			            	}
			            	try {
				            	ServiceResult result = new ServiceResult();
				            	result.setResult(IocManager.invoke(req.getService(),req));
				            	result.setContextParam(req.getContext());
				            	sendData(ctx.channel(),OperationDefine.RES_RESULT,uuid,Convert.convertToBytes(result));
			            	}finally {
				            	if(req.getContext()!=null){
				            		ContextManager.remove();
				            	}
				            	serviceThresholdManager.subDoings(req.getService());
			            	}
			            }else if (operation == OperationDefine.REQ_ASYNC_SERVICE.value()) {
			            	//异步微服务调用
			            	ServiceRequest req = (ServiceRequest)Convert.convertToObject(data, offset, data.length-offset, ServiceRequest.class);
			            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
			            	ServiceStatus.getServices().addCalled(req.getService());
			            	if(req.getContext()!=null){
			            		ContextManager.putAll(req.getContext());
			            	}
			            	try {
			            		IocManager.invoke(req.getService(),req);
			            	}finally {
				            	if(req.getContext()!=null){
				            		ContextManager.remove();
				            	}
			            	}
			            }else if (operation == OperationDefine.REQ_SENDDATA.value()) {
			            	//传输数据
			            	int len = SerializableObject.bytesToInt(data,offset);
			            	offset = offset + BinaryReader.LEN_INTEGER;
			            	ServiceRequest req = new ServiceRequest();
			            	req.setService(SerializableObject.bytesToString(data,offset, len, International.CHARSET));
			            	offset = offset + len;
			            	len = SerializableObject.bytesToInt(data,offset);
			            	offset = offset + BinaryReader.LEN_INTEGER;
			            	req.setParameterTypes(byte[].class);
			            	req.setMethod(SerializableObject.bytesToString(data,offset, len, International.CHARSET));
			            	req.setParams(new Object[]{SerializableObject.copyOfRange(data, offset,data.length - offset)});
			            	byte[] result = (byte[])IocManager.invoke(req.getService(),req);
			            	sendData(ctx.channel(),OperationDefine.RES_RESULT,uuid,result);
			            }else if (operation == OperationDefine.REQ_ASYNC_SERVICE.value()) {
			            	//异步传输数据
			            	int len = SerializableObject.bytesToInt(data,offset);
			            	offset = offset + BinaryReader.LEN_INTEGER;
			            	ServiceRequest req = new ServiceRequest();
			            	req.setService(SerializableObject.bytesToString(data,offset, len, International.CHARSET));
			            	offset = offset + len;
			            	len = SerializableObject.bytesToInt(data,offset);
			            	offset = offset + BinaryReader.LEN_INTEGER;
			            	req.setParameterTypes(byte[].class);
			            	req.setMethod(SerializableObject.bytesToString(data,offset, len, International.CHARSET));
			            	req.setParams(new Object[]{SerializableObject.copyOfRange(data, offset,data.length - offset)});
			            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
			            	IocManager.invoke(req.getService(),req);
			            }else if (operation == OperationDefine.REQ_SERVICES_STATUS.value()) {
			            	sendData(ctx.channel(),OperationDefine.RES_SERVICES_STATUS,uuid,Convert.convertToBytes(ServiceStatus.getServices()));
			            }else if (operation == OperationDefine.REQ_STOP.value()) {
			            	ServiceServer.stop();
			            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
			            }else{
			            	LogManager.errorLog(this.getClass(), new ServerException("接收到非法数据,非ByteBuf数据,operation=" + operation));
			            }
		            } catch (ServerException e) {
		            	addError();
		            	sendException(ctx.channel(),OperationDefine.RES_SERVICES_ERROR,uuid,e);
		            	LogManager.errorLog(this.getClass(), e);
		            } catch (ServiceException e) {
		            	addError();
		            	sendException(ctx.channel(),OperationDefine.RES_ERROR,uuid,e);
		            	LogManager.errorLog(this.getClass(), e);
		            } catch (Throwable e) {
		            	addError();
		            	sendException(ctx.channel(),OperationDefine.RES_ERROR,uuid,e);
		            	LogManager.errorLog(this.getClass(), new ServerException("接收到非法数据,非ByteBuf数据",e));
		            }finally{
		            	Object startTime = ContextManager.getValue(key);
		            	if(startTime!=null && startTime instanceof Long){
		            		if(System.currentTimeMillis() - (Long)startTime>config.getServiceTimeOut()){
		            			timeOut++;
		            		}
		            	}
		            	ContextManager.remove(key);
		            	subDoing();
		            }
				}
	        });
        } catch (Throwable e) {
        	addError();
        	sendException(ctx.channel(),OperationDefine.RES_ERROR,UUID.randomUUID().toString(),e);
        	LogManager.errorLog(this.getClass(), new ServerException("接收到非法数据,readBytes出错",e));
        } finally {
        	if(buf != null){
        		buf.release();//手动释放缓冲区
        	}
        }
    }
    
    
    
    /**
     * 将错误写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendException(Channel channel, OperationDefine operation,String uuid,Throwable e){
    	String message = LogManager.exceptionToString(e);
    	try {
	    	if(message!=null){
	    		sendData(channel,operation,uuid,message.getBytes(International.CHARSET));
	    	}else{
	    		sendData(channel,operation,uuid,null);
	    	}
    	}catch(Exception e1) {
    		LogManager.errorLog(ServiceServerNettyHandler.class, new ServerException("sendException",e1));
    	}
    }
    
    /**
     * 将数据写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendData(Channel channel, OperationDefine operation,String uuid,byte[] data) throws UnsupportedEncodingException{
        if (channel != null && channel.isOpen() && channel.isActive() && channel.isWritable()) {
            ByteBufAllocator alloc = channel.alloc();
        	int len = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
            if (data != null) {
                len = len + data.length;
            }
            ByteBuf buf = alloc.buffer(len);
            buf.writeByte(operation.value());
            buf.writeBytes(uuid.getBytes(International.CHARSET));
            if(data!=null){
            	buf.writeBytes(data);
            }
        	channel.writeAndFlush(buf).addListener(
            	new ChannelFutureListener() {
                	public void operationComplete(ChannelFuture f)throws Exception {
                        if (!f.isSuccess()) {
                            LogManager.errorLog(ServiceServerNettyHandler.class, "Message sending failed",f.cause());
                        }
                    }
                });

        }
    }
    
    /**
     * @see ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.disconnect();// 服务端发送异常时关闭客户端
    }

    /**
     * @see ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   		ctx.flush();
    }

    /**
     * 当客户端主动断开服务端的链接后，这个通道就是不活跃的。 也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }
    
    /**
     * 心跳检测
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
                ctx.close();
            }
        }
    }


    
    
}
